package com.example.ignitedemo.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;


/**
 * @author ：lichuankang
 * @date ：2021/10/25 14:45
 * @description ：ignite控制器
 */
@RestController
@Slf4j
public class IgniteController {

    private final Ignite ignite;

    private final IgniteMessaging rmtMsg;

    private final IgniteEvents events;

    /**
     * 启动时获取所有节点信息
     */
    private Map<String, ClusterNode> clusterGroupMap;

    public IgniteController(Ignite ignite) {
        this.ignite = ignite;
        clusterGroupMap = ignite.cluster().forRemotes().nodes().stream()
                .collect(Collectors.toMap(clusterNode -> clusterNode.id().toString(),
                                          Function.identity()));
        this.rmtMsg = ignite.message(ignite.cluster().forRemotes());

        // Add listener for ordered messages on all remote nodes.
        //所有集群都监听该topic
        //集群内的所有节点上注册了一个带有指定主题的监听器然后监听来自集群内任意节点的消息
        rmtMsg.localListen("topic", (nodeId, msg) -> {
            log.info("收到集群有序消息 [msg=" + msg + ", from=" + nodeId + ']');
            //处理消息。本地只投递消息。根据用户id投递消息。消息存储由发起方节点处理。
            return true; // Return true to continue listening.
        });
        this.events = ignite.events();

        //闭包 集群发现事件。
        UUID uuid = events.remoteListen((IgniteBiPredicate<UUID, DiscoveryEvent>) (uuid1, e) -> {
            clusterGroupMap = e.topologyNodes().stream()
                    .collect(Collectors.toMap(clusterNode -> clusterNode.id().toString(),
                                              Function.identity()));
            log.info("节点非正常离开集群就关闭监听。nodeID = " + e.node().id() + ", addresses=" + e.node().addresses());
            return true; //continue listening
        }, null, EventType.EVT_NODE_JOINED);

    }

    @GetMapping(value = "/igniteWrite")
    public String testIgniteWrite() {
        IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");
        cache.put("name", "this is name");
        cache.put("age", "24");
        return "success";
    }

    @GetMapping(value = "/igniteRead")
    public String igniteRead() {
        IgniteCache<String, String> cache = ignite.cache("myCache");
        return cache.get("name");
    }

    @GetMapping(value = "/write")
    public String igniteWriteByInput(String key, String value) {
        IgniteCache<String, String> cache = ignite.getOrCreateCache("myCache");
        cache.put(key, value);
        return "success";
    }

    @GetMapping(value = "/read")
    public String igniteWriteByRead(String key) {
        IgniteCache<String, String> cache = ignite.cache("myCache");
        return cache.get(key);
    }

    /**
     * 给其它服务节点推送消息
     *
     * @param msg
     * @return
     */
    @GetMapping(value = "/msg")
    public String getServer(String msg) {
        // Messaging instance over given cluster group (in this case, remote nodes).
        //发布带主题的有序消息
        rmtMsg.sendOrdered("topic", msg, 0);
        return "ok";
    }

}

